package com.bieber.computer;

import com.bieber.common.Constants;
import com.bieber.common.PersonOrder;
import com.bieber.common.model.BizOrder;
import com.bieber.common.model.ConsumerPerson;
import com.bieber.common.model.Task;
import com.bieber.common.node.IgniteServer;
import com.bieber.common.compute.ComputePersonOrderTask;
import com.bieber.computer.splitter.DataSplitter;
import com.bieber.computer.splitter.OrderSplitter;
import com.bieber.computer.splitter.PersonSplitter;
import org.apache.commons.io.IOUtils;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;

import javax.cache.Cache;
import java.io.*;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by bieber on 2015/9/5.
 */
public class ComputerServer extends IgniteServer<ComputerConfig>{

    PersonSplitter personSplitter;
    OrderSplitter orderSplitter;
    Executor preparedPerson;
    Executor preparedOrder;
    int countdownSize=2;
    int personSize;
    AtomicInteger doneTaskSize = new AtomicInteger(0);
    public ComputerServer() {
        super(false);
    }
    
    private void prepare(ComputerConfig config){
        personSplitter = new PersonSplitter(config,ignite);
        orderSplitter = new OrderSplitter(config,ignite);
        preparedPerson = new Executor(personSplitter);
        preparedOrder = new Executor(orderSplitter);
        preparedPerson.start();
        preparedOrder.start();
    }
    
    private void execute(CountDownLatch countDownLatch){
        preparedPerson.execute(countDownLatch);
        preparedOrder.execute(countDownLatch);
    }
    
    @Override
    protected void doStart(ComputerConfig config) {
        if(ignite.cluster().forServers().nodes().isEmpty()){
            throw new IllegalStateException("not found any server node in current cluster");
        }
        Constants.COMMON_LOGGER.info("prepare data loading....");
        prepare(config);
        int totalTask =0;
        while(true){
            CountDownLatch countDownLatch = new CountDownLatch(countdownSize);
            execute(countDownLatch);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new IllegalArgumentException("failed to prepare data",e);
            }
            if(personSplitter.isFinished()&&preparedPerson.isRunning()){
                Constants.COMMON_LOGGER.info("person data parse finished");
                preparedPerson.finished();
                countdownSize--;
            }
            if(orderSplitter.isFinished()&&preparedOrder.isRunning()){
                Constants.COMMON_LOGGER.info("order data parse finished");
                preparedOrder.finished();
                countdownSize--;
            }
            if(countdownSize==0){
                break;
            }
            IgniteCompute compute = ignite.compute().withAsync();
            if(personSize==0){
                IgniteCache<Integer,ConsumerPerson> personCache = ignite.cache(Constants.PERSON_CACHE);
                personSize = personCache.size(CachePeekMode.PRIMARY);
                if(personSize<=0){
                    throw new IllegalStateException("not found any person in cache");
                }
            }
            Constants.COMMON_LOGGER.info("total person size [{}]",personSize);
            Task task = new Task();
            task.setOrderCacheName(orderSplitter.getCacheName());
            task.setPersonSize(personSize);
            compute.execute(ComputePersonOrderTask.class,task);
            ComputeTaskFuture<Integer> future = compute.future();
            future.listen(new TaskCompletedCallBack(task));
            totalTask++;
        }
        while(true){
            if(totalTask-doneTaskSize.get()==0){
                break;
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
        Constants.COMMON_LOGGER.info("Finished compute.....");
        SqlQuery sql = new SqlQuery(PersonOrder.class, "order by amount desc limit 1000");
        IgniteCache<Integer,PersonOrder> personOrderIgniteCache = ignite.cache(Constants.PERSON_ORDER_CACHE);
        QueryCursor<Cache.Entry<Long, PersonOrder>> cursor  = personOrderIgniteCache.query(sql);
        writeResultCSV(cursor);
        stop(config);
    }
    
    private void writeResultCSV(QueryCursor<Cache.Entry<Long, PersonOrder>> cursor){
        BufferedWriter writer = null;
        try {
            writer= new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File("C:\\Users\\bieber\\working\\imdg\\result.csv"))));
            StringBuffer stringBuffer = new StringBuffer();
            Random random = new Random(System.currentTimeMillis());
            for(Cache.Entry<Long,PersonOrder> entry:cursor){
                stringBuffer.append(entry.getValue().getId()).append(",").append(entry.getValue().getName()).append(",").append(entry.getValue().getAmount()).append(",").append(entry.getValue().getCount());
                writer.write(stringBuffer.toString());
                writer.newLine();
                stringBuffer.setLength(0);
            }
            writer.flush();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeQuietly(writer);
        }
    }

    @Override
    public void stop(ComputerConfig config) {
        ignite.close();
        System.exit(0);
    }
    
    class TaskCompletedCallBack implements IgniteInClosure<IgniteFuture<Integer>>{
        
        private Task task;
        
        TaskCompletedCallBack(Task task){
            this.task=task;
        }

        @Override
        public void apply(IgniteFuture<Integer> result) {
            Constants.COMMON_LOGGER.info("task [{}] is done [{}]",task.getOrderCacheName(),result.isDone());
            Constants.COMMON_LOGGER.info("task [{}] duration [{}]ms",task.getOrderCacheName(),result.duration());
            ignite.cache(task.getOrderCacheName()).removeAll();
            doneTaskSize.incrementAndGet();
            Constants.COMMON_LOGGER.info("remove order "+task.getOrderCacheName()+" cache");
        }
    }
    
    class Executor extends Thread{
        
        private CountDownLatch countDownLatch;
        
        private volatile boolean pause=true;
        
        private volatile boolean running=true;
        
        public Executor(Runnable runnable) {
            super(runnable);
        }
        public void pause(){
            pause=true;
        }
        public void finished(){
            running=false;
        }
        public boolean isRunning(){
            return running;
        }
        public void execute(CountDownLatch countDownLatch){
            this.countDownLatch = countDownLatch;
            pause=false;
        }
        
        @Override
        public void run() {
            while(true&&running){
                while(pause){
                    //暂停提取CSV文件
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                }
                super.run();
                countDownLatch.countDown();
                pause=true;
            }
        }
    }
}
